# 该文件为测试套执行的主要流程
# encoding=utf-8

import os
import time
import platform
from constants import *
from json.decoder import JSONDecoder
from utils import command_execute, parse_json, Logger, now_date_time, CommandExecute


class ExecuteTestsHandler():
    def __init__(self, cmd_exe: CommandExecute, config: JSONDecoder, coverage: bool, subsystem='ace_engine'):
        self.config = config
        self.code_path = self.config['personal']['code']['path']
        self.subsystem = subsystem
        self.sheild_dir = ""
        self.total_suites_count = 0
        self.total_tests_count = 0
        self.total_failed_tests_count = 0
        self.total_passed_tests_count = 0
        self.total_unavailable_suite_count = 0
        self.total_abnormal_suites_count = 0
        self.unavailable_test_suite_result = []
        self.abnormal_test_suite_result = []
        self.failed_test_suite_result = []
        self.cost_time = 0
        self.start_time = ""
        self.coverage = coverage
        self.cmd_exe = cmd_exe

    def _mkdir(self, path):
        isExits = os.path.exists(path)
        if not isExits:
            os.makedirs(path)
            Logger.info(f"path {path} make successfully")
        else:
            Logger.info(f"path {path} already exits")

    def record_parse_log_result(self, parse_result, test_suite_name):
        state = parse_result['state']
        if state == 3:
            # undefined
            self.abnormal_test_suite_result.append(parse_result)
            self.total_abnormal_suites_count = self.total_abnormal_suites_count + 1
            Logger.error(
                f"[{test_suite_name}] test suite is in an undefined state, please check the result promptly")
        elif state == 2:
            # failed
            self.failed_test_suite_result.append(parse_result)
            total_count = parse_result['total_count']
            failed_count = parse_result['failed_count']
            passed_count = parse_result['passed_count']
            self.total_tests_count = self.total_tests_count + total_count
            self.total_failed_tests_count = self.total_failed_tests_count + failed_count
            self.total_passed_tests_count = self.total_passed_tests_count + passed_count
            Logger.warning(
                f"[{test_suite_name}] partial failure, total_count is {total_count} tests,\
                     failed_count is {failed_count} tests, passed_count is {passed_count} tests")
        elif state == 1:
            # passed
            total_count = parse_result['total_count']
            passed_count = parse_result['passed_count']
            self.total_tests_count = self.total_tests_count + total_count
            self.total_passed_tests_count = self.total_passed_tests_count + passed_count
            Logger.info(
                f"[{test_suite_name}] execute successfully,\
                total_count is {total_count} tests, passed_count is {passed_count} tests")
        elif state == 0:
            # signal
            self.unavailable_test_suite_result.append(parse_result)
            self.total_unavailable_suite_count = self.total_unavailable_suite_count + 1
            signal = parse_result['signal']
            Logger.error(f"[{test_suite_name}] test suite is in {signal}")

    def parse_log(self, lines, test_suite_name, print_log=False):
        """
        解析log的核心逻辑 比较固定的解析方法
        result:
            - state: [0, 1, 2, 3]
                - 0表示SIGNAL
                - 1表示PASSED
                - 2表示FAILED
                - 3表示UNDEFINED
        """
        if print_log:
            for line in lines:
                Logger.debug(line.strip('\n'))

        # 直接先读取最后一行
        last_line = lines[-1].strip('\n')
        # 是否含有Mutex的标记
        MutexSign = False
        if "Mutex" in last_line:
            last_line = lines[-2].strip('\n')
            MutexSign = True

        signals = ["Signal 4", "Signal 6", "Signal 7", "Signal 11"]
        result = {
            'suite_name': test_suite_name,
            'state': 3,
            'signal': "",
            'signal_log': [],
            'abnormal_log': "",
            'total_count': 0,
            'failed_count': 0,
            'passed_count': 0,
            'failed_log': {
                'name': [],
                'time': [],
                'log': []
            }
        }

        # 如果最后一行是Signal
        t_line = last_line.strip()
        if t_line in signals:
            result['state'] = 0
            result['signal'] = t_line
            result['signal_log'] = lines
            return result

        # 如果最后一行是PASSED
        if "[  PASSED  ]" in last_line:
            split_res = last_line.split(" ")
            total_count = int(split_res[5])
            result['state'] = 1
            result['total_count'] = total_count
            result['passed_count'] = total_count
            return result

        if "FAILED TEST SUITE" in last_line.strip():
            result['state'] = 3
            result['abnormal_log'] = lines
            return result

        # 如果最后一行是FAILED TESTS
        if "FAILED TESTS" in last_line.strip() or "FAILED TEST" in last_line.strip():
            lastIndex = -1
            total_test_line = ""
            for i in range(20):
                if "====" in lines[lastIndex]:
                    total_test_line = lines[lastIndex]
                    break
                lastIndex = lastIndex - 1
            result['state'] = 2
            # 获取到失败
            split_res = last_line.split(" ")
            failed_test_count = int(split_res[1])
            # 获取到总用例数
            total_res = total_test_line.split(" ")
            total_count = int(total_res[1])
            passed_count = total_count - failed_test_count
            result['total_count'] = total_count
            result['failed_count'] = failed_test_count
            result['passed_count'] = passed_count
            # 获取到失败用例名
            t_count = failed_test_count
            failed_tests_name = []
            while t_count > 0:
                if MutexSign:
                    split_res = lines[-3-t_count].strip('\n').split(' ')
                else:
                    split_res = lines[-2-t_count].strip('\n').split(' ')
                failed_tests_name.append(split_res[5])
                t_count = t_count - 1
            for failed_test_name in failed_tests_name:
                # 遍历此时的lines，第一次找到failed_test_name后停止并记录此时的坐标idx
                t_name = "[ RUN      ] " + failed_test_name
                sIdx = 0
                eIdx = 0
                for idx in range(len(lines)):
                    line = lines[idx].strip('\n')
                    if t_name == line:
                        sIdx = idx + 1
                        # 从sIdx开始向后找直到遇到failed_test_name
                        while failed_test_name not in lines[idx+1].strip('\n'):
                            idx = idx + 1
                        eIdx = idx + 1
                        break
                # 这里比较复杂 可以理解为先从lines的eIdx获取到["(2" --解析--> 2]
                time = int((lines[eIdx].split(' ')[-2])[1:])
                result['failed_log']['name'].append(failed_test_name)
                result['failed_log']['time'].append(time)
                result['failed_log']['log'].append(lines[sIdx:eIdx])
            return result

        if result['state'] == 3:
            result['abnormal_log'] = lines

        return result

    def execute_single_test(self, test_suite_name):
        command_execute(HDC_CLOCK)
        command_execute(HDC_DELETE_FAULTLOG)
        command_execute(HDC_DELETE_TEMP)
        command_execute(HDC_DELETE_DEBUG)
        command_execute(HDC_DELETE_FAULTLOGGER)
        command_execute(HDC_HILOG_W_STOP)
        command_execute(HDC_HILOG_CLEAR)
        command_execute(HDC_DELETE_TEST)
        command_execute(HDC_CREATE_TEST)
        command_execute(
            f'cd testsuites && {HDC_FILE_SEND} {test_suite_name} /data/test')
        gcov_prefiex_length = len(self.code_path.split('/')) + 1
        gcov_cmd = f"hdc shell \"cd /data/test && rm -f {test_suite_name}.xml && chmod +x * && GCOV_PREFIX=. GCOV_PREFIX_STRIP={gcov_prefiex_length} ./{test_suite_name}\""
        result = command_execute(gcov_cmd)
        for line in result:
            Logger.debug(line.strip('\n'))
        parse_result = self.parse_log(result, test_suite_name)
        self.record_parse_log_result(parse_result, test_suite_name)
        result = command_execute(HDC_FIND_OBJ)
        if len(result) != 0 and self.coverage:
            result = result[0].strip('\n')
            command_execute(f"hdc shell tar -zcf /data/test/obj.tar.gz -C {result} .")
            command_execute(HDC_FILE_RECV_OBJ)
            self.cmd_exe.sftp_put("temp/obj.tar.gz", f"{self.code_path}/out/rk3568/")
            try:
                # 调用put方法，可能会引发OSError异常
                self.cmd_exe.sftp_put("temp/obj.tar.gz", f"{self.code_path}/out/rk3568/")
            except OSError as e:
                # 在这里处理OSError异常
                Logger.error(f"An OSError occurred: {e}")
            self.cmd_exe.execute_no_log(f"cd {self.code_path}/out/rk3568 && tar -zxmf obj.tar.gz")
        else:
            Logger.error(f"{test_suite_name} coverage file gcda not generated")

    def execute_all_tests(self):
        self.start_time = now_date_time()
        start = time.perf_counter()
        for _, _, files in os.walk('testsuites'):
            for file_name in files:
                self.total_suites_count = self.total_suites_count + 1
                self.execute_single_test(file_name)
        end = time.perf_counter()
        self.cost_time = end - start

    def generate_sheild_dir(self):
        shield_list = self.config['subsystem']['sheild_paths']
        for shield in shield_list:
            self.sheild_dir = self.sheild_dir + " " + shield
            Logger.info(f"Blocking rules ==> {shield}")

    def gernerate_coverage_html(self):
        llvm_gcov_path = self.config['personal']['code']['gcov']
        coverage_path = "coverage"
        self._mkdir(coverage_path)
        ace_engine_path = f"{self.code_path}/out/rk3568/obj/foundation/arkui/ace_engine"
        cd_path = f"cd {ace_engine_path}"
        lcov_cmd1 = f"lcov -d . -o cov_ace_engine.info -c --gcov-tool {llvm_gcov_path} --rc lcov_branch_coverage=1"
        lcov_cmd2 = f"lcov -r cov_ace_engine.info {self.sheild_dir} -o cov_ace_engine.info --rc lcov_branch_coverage=1"
        gen_html = f"genhtml -o {coverage_path} --ignore-errors source cov_ace_engine.info --branch-coverage"
        self.cmd_exe.execute_no_log(f"{cd_path} && {lcov_cmd1}")
        Logger.info("LCOV generate cov_ace_engine_info successfully")
        self.cmd_exe.execute_no_log(f"{cd_path} && {lcov_cmd2}")
        Logger.info("LCOV remove extra file successfully")
        self.cmd_exe.execute_no_log(f"{cd_path} && {gen_html}")
        Logger.info("LCOV generate html successfully")

    def handle_log(self, log):
        result = ""
        for line in log:
            line = line.replace(" ", "&nbsp;").strip('\n') + "<br>"
            result = result + line
        return result

    def generate_html(self, exec_info, abnormal, unavailable, failed):
        system_name = platform.system()
        start_time = exec_info['start_time']
        total_suites_count = exec_info['total_suites_count']
        total_unavailable_suite_count = exec_info['total_unavailable_suite_count']
        total_abnormal_suites_count = exec_info['total_abnormal_suites_count']
        total_run_suites_count = total_suites_count - \
            (total_unavailable_suite_count + total_abnormal_suites_count)
        total_tests_count = exec_info['total_tests_count']
        total_passed_tests_count = exec_info['total_passed_tests_count']
        total_failed_tests_count = exec_info['total_failed_tests_count']
        cost_time = round(exec_info['cost_time'], 0)

        # 写html的头部
        self._mkdir('reports')
        failure_html = open("reports/failures.html", 'w')
        failure_html.write("<html version=\"1.0\" lang=\"en\">\n")
        failure_html.write("<head>\n")
        failure_html.write(
            "<meta http-equiv=\"Content-Type\" content=\"text/html; charset=utf-8\"/>\n")
        failure_html.write("<title>Failures Report</title>\n")
        failure_html.write(
            "<link href=\"../css/failures.css\" rel=\"stylesheet\" type=\"text/css\">\n")
        failure_html.write("</head>\n")
        failure_html.write("<body>\n")
        failure_html.write("<div class=\"container\">\n")

        # 写exec-info
        failure_html.write("<table class=\"exec-info\" id=\"summary\">\n")
        failure_html.write("<tr>\n")
        failure_html.write("<th colspan=\"4\">Test Summary</th>\n")
        failure_html.write("</tr>\n")
        failure_html.write("<tr>\n")
        failure_html.write("<td class=\"normal first\">Platform:</td>\n")
        failure_html.write("<td class=\"normal second\">RK3568</td>\n")
        failure_html.write("<td class=\"normal third\">Test Type:</td>\n")
        failure_html.write("<td class=\"normal fourth\">unittest</td>\n")
        failure_html.write("</tr>\n")
        failure_html.write("<tr>\n")
        failure_html.write("<td class=\"normal first\">Device Name:</td>\n")
        failure_html.write(
            "<td class=\"normal second\">7001005458************5a385e3900</td>\n")
        failure_html.write("<td class=\"normal third\">Host Info:</td>\n")
        failure_html.write(f"<td class=\"normal fourth\">{system_name}</td>\n")
        failure_html.write("</tr>\n")
        failure_html.write("<tr>\n")
        failure_html.write(
            "<td class=\"normal first\">Test Start Time:</td>\n")
        failure_html.write(f"<td class=\"normal second\">{start_time}</td>\n")
        failure_html.write("<td class=\"normal third\">Execution Time:</td>\n")
        failure_html.write(
            f"<td class=\"normal fourth\">{cost_time}&nbsp;second</td>\n")
        failure_html.write("</tr>\n")
        failure_html.write("</table>\n")

        # 写测试套的所有信息
        failure_html.write("<table class=\"summary\">\n")
        failure_html.write("<tr>\n")
        failure_html.write(
            f"<th class=\"normal modules color-normal\">{total_suites_count}</th>\n")
        failure_html.write(
            f"<th class=\"normal run-modules color-passed\">{total_run_suites_count}</th>\n")
        failure_html.write(
            f"<th class=\"normal total-tests color-normal\">{total_tests_count}</th>\n")
        failure_html.write(
            f"<th class=\"normal passed color-passed\">{total_passed_tests_count}</th>\n")
        failure_html.write(
            f"<th class=\"normal failed color-failed\">{total_failed_tests_count}</th>\n")
        failure_html.write(
            f"<th class=\"normal abnormal color-abnormal\">{total_abnormal_suites_count}</th>\n")
        failure_html.write(
            f"<th class=\"normal unavailable color-unavailable\">{total_unavailable_suite_count}</th>\n")
        failure_html.write("</tr>\n")
        failure_html.write("<tr>\n")
        failure_html.write("<td class=\"normal modules\">Modules</td>\n")
        failure_html.write(
            "<td class=\"normal run-modules\">Run Modules</td>\n")
        failure_html.write(
            "<td class=\"normal total-tests\">Total Tests</td>\n")
        failure_html.write("<td class=\"normal passed\">Passed</td>\n")
        failure_html.write("<td class=\"normal failed\">Failed</td>\n")
        failure_html.write("<td class=\"normal abnormal\">Abnormal</td>\n")
        failure_html.write(
            "<td class=\"normal unavailable\">Unavailable</td>\n")
        failure_html.write("</tr>\n")
        failure_html.write("</table>\n")

        # 写异常测试套
        for result in abnormal:
            test_suite_name = result['suite_name']
            # 查表找owner
            failure_html.write("<table class='failure-test'>\n")
            failure_html.write("<tr>\n")
            failure_html.write("<th class='title' colspan='4'>\n")
            failure_html.write(
                f"<span class='title'>{test_suite_name}&nbsp;&nbsp;&nbsp;</span>\n")
            failure_html.write("</th>\n")
            failure_html.write("</tr>\n")
            failure_html.write("<tr>\n")
            failure_html.write("<th class='normal test'>Test</th>\n")
            failure_html.write(
                "<th class='normal status'><div class='circle-normal circle-white'></div></th>\n")
            failure_html.write("<th class='normal result'>Result</th>\n")
            failure_html.write("<th class='normal details'>Details</th>\n")
            failure_html.write("</tr>\n")
            failure_html.write("<tr>\n")
            failure_html.write(
                f"<td class='normal test'>{test_suite_name}</td>\n")
            failure_html.write(
                "<td class='normal status'><div class='circle-normal circle-unavailable'></div></td>\n")
            failure_html.write("<td class='normal result'>abnormal</td>\n")
            failure_html.write(
                "<td class='normal details'>The test suite is in an abnormal state, \
                    possibly due to an issue with the log output. Please check it as soon as possible!</td>\n")
            failure_html.write("</tr>\n")
            failure_html.write("</table>\n")

        # 写崩溃测试套
        for result in unavailable:
            test_suite_name = result['suite_name']
            signal_log = self.handle_log(result['signal_log'])
            # 查表找owner
            failure_html.write("<table class='failure-test'>\n")
            failure_html.write("<tr>\n")
            failure_html.write("<th class='title' colspan='4'>\n")
            failure_html.write(
                f"<span class='title'>{test_suite_name}&nbsp;&nbsp;&nbsp;</span>\n")
            failure_html.write("</th>\n")
            failure_html.write("</tr>\n")
            failure_html.write("<tr>\n")
            failure_html.write("<th class='normal test'>Test</th>\n")
            failure_html.write(
                "<th class='normal status'><div class='circle-normal circle-white'></div></th>\n")
            failure_html.write("<th class='normal result'>Result</th>\n")
            failure_html.write("<th class='normal details'>Details</th>\n")
            failure_html.write("</tr>\n")
            failure_html.write("<tr>\n")
            failure_html.write(
                f"<td class='normal test'>{test_suite_name}</td>\n")
            failure_html.write(
                "<td class='normal status'><div class='circle-normal circle-unavailable'></div></td>\n")
            failure_html.write("<td class='normal result'>unavailable</td>\n")
            failure_html.write(
                f"<td class='normal details'>{signal_log}</td>\n")
            failure_html.write("</tr>\n")
            failure_html.write("</table>\n")

        # 写失败测试套
        for result in failed:
            test_suite_name = result['suite_name']
            # 写测试套的头
            failure_html.write("<table class='failure-test'>\n")
            failure_html.write("<tr>\n")
            failure_html.write("<th class='title' colspan='4'>\n")
            failure_html.write(
                f"<span class='title'>{test_suite_name}&nbsp;&nbsp;&nbsp;</span>\n")
            failure_html.write("</th>\n")
            failure_html.write("</tr>\n")
            failure_html.write("<tr>\n")
            failure_html.write("<th class='normal test'>Test</th>\n")
            failure_html.write(
                "<th class='normal status'><div class='circle-normal circle-white'></div></th>\n")
            failure_html.write("<th class='normal result'>Result</th>\n")
            failure_html.write("<th class='normal details'>Details</th>\n")
            failure_html.write("</tr>\n")

            # 写所有测试用例失败的信息
            failed_log = result['failed_log']
            test_name = failed_log['name']
            test_log = failed_log['log']

            for idx in range(len(test_name)):
                log = self.handle_log(test_log[idx])
                name = test_name[idx]
                failure_html.write("<tr>\n")
                failure_html.write(f"<td class='normal test'>{name}</td>\n")
                failure_html.write(
                    "<td class='normal status'><div class='circle-normal circle-failed'></div></td>\n")
                failure_html.write("<td class='normal result'>failed</td>\n")
                failure_html.write(f"<td class='normal details'>{log}</td>\n")
                failure_html.write("</tr>\n")
            # 写测试套的尾
            failure_html.write("</table>\n")

        # 写html的结尾
        failure_html.write("</div>\n")
        failure_html.write("</body>\n")
        failure_html.write("</html>\n")
        failure_html.close()
        Logger.info("Generate failures html for execution results")

    def gernerate_suite_execute_html(self):
        """
        产生用例执行报告
        """
        Logger.info(f"Total number of test suites: {self.total_suites_count}")
        Logger.info(
            f"Total number of unavailable test suites: {self.total_unavailable_suite_count}")
        Logger.info(
            f"Total number of abnormal test suites: {self.total_abnormal_suites_count}")
        Logger.info(f"Total number of test cases: {self.total_tests_count}")
        Logger.info(
            f"Total number of test cases failed: {self.total_failed_tests_count}")
        Logger.info(
            f"Total test cases execute cost time: {self.cost_time} seconds")
        exec_info = {}
        exec_info['total_suites_count'] = self.total_suites_count
        exec_info['total_unavailable_suite_count'] = self.total_unavailable_suite_count
        exec_info['total_abnormal_suites_count'] = self.total_abnormal_suites_count
        exec_info['total_tests_count'] = self.total_tests_count
        exec_info['total_passed_tests_count'] = self.total_passed_tests_count
        exec_info['total_failed_tests_count'] = self.total_failed_tests_count
        exec_info['cost_time'] = self.cost_time
        exec_info['start_time'] = self.start_time
        self.generate_html(exec_info, self.abnormal_test_suite_result,
                           self.unavailable_test_suite_result, self.failed_test_suite_result)

    def process(self):
        self.execute_all_tests()
        if self.coverage:
            self.generate_sheild_dir()
            self.gernerate_coverage_html()
        self.gernerate_suite_execute_html()
